package com.alinesno.cloud.base.message.kafka;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

/**
 * 消息发送组件 
 * @author LuoAnDong
 * @since 2018年12月17日 下午9:28:43
 */
@Component
public class KafkaMessageProducer {

	private static final Logger log = LoggerFactory.getLogger(KafkaMessageProducer.class);

	@Autowired
	private KafkaTemplate<String, String> kafkaTemplate;

	/**
	 * 发送消息
	 * 
	 * @param topic   消息主题
	 * @param message 消息内容
	 */
	public void sendMssage(String topic, String message) {
		ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
		future.addCallback(success -> log.debug("KafkaMessageProducer 发送消息成功！"),
				fail -> log.error("KafkaMessageProducer 发送消息失败！"));
	}
}